iT邦幫忙

2025 iThome 鐵人賽

DAY 17
1
Rust

DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅系列 第 17

Day 17: ExecutionPlan 體系架構 Part 2 - Stream 執行模型

  • 分享至 

  • xImage
  •  

前言

在昨天的文章中,我們探討了 ExecutionPlan trait 的核心設計,了解了 schema()properties()execute() 等關鍵方法。今天我們要深入探討 execute() 方法背後的 Stream 執行模型

今天的學習目標是:

  1. 理解 SendableRecordBatchStream 的設計與作用
  2. 掌握 Pull-based Volcano 執行模型的工作原理
  3. 了解 RecordBatch 在執行計劃間的流動過程
  4. 認識 TaskContext 在執行期間提供的支援
  5. 學習 Rust 異步流如何實現背壓(Backpressure)處理

這些概念是理解 DataFusion 查詢執行引擎的關鍵,也是實現自訂執行算子的基礎。

SendableRecordBatchStream - 可傳遞的數據流

型別定義與設計理念

在 DataFusion 中,查詢執行的結果不是一次性返回所有數據,而是通過一個異步流(Stream)逐批次地產生。這個流的型別就是 SendableRecordBatchStream

// datafusion/execution/src/stream.rs
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

這個型別定義看似簡單,但包含了幾個重要的設計考量:

1. RecordBatchStream Trait

RecordBatchStream 是一個擴展了 Rust 標準 Stream trait 的特化 trait:

/// Trait for types that stream RecordBatch
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
    /// Returns the schema of this RecordBatchStream.
    fn schema(&self) -> SchemaRef;
}

它的核心特點是:

  • 繼承 Stream<Item = Result<RecordBatch>>:每次從流中取出一個 RecordBatch,或遇到錯誤
  • 提供 schema() 方法:即使流中還沒有數據,也能提前知道數據的結構(Schema)
  • Schema 保證一致性:流中所有的 RecordBatch 必須符合同一個 Schema

這種設計讓下游算子能夠在開始處理數據前就規劃好記憶體分配和轉換邏輯,而不需要等待第一筆數據到達。

2. Pin 與 Box 的組合

Pin<Box<...>> 的組合提供了:

  • Box:堆積分配,實現 trait object 的動態分派,允許不同的執行算子返回不同的具體流類型
  • Pin:固定記憶體位置,這對於自引用的異步結構體是必要的(Rust 異步機制的要求)

3. Send Trait Bound

Send trait bound 表示這個流可以安全地在執行緒間傳遞,這是 DataFusion 實現並行執行的基礎。每個分區(partition)的執行可以在不同的執行緒或異步任務中進行。

RecordBatchStreamAdapter - 創建流的便利工具

在實際實作中,DataFusion 提供了 RecordBatchStreamAdapter 來簡化流的創建:

use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::stream;

// 創建一個簡單的流
let schema = Arc::new(Schema::new(vec![
    Field::new("id", DataType::Int32, false),
]));

let batches = vec![
    RecordBatch::try_new(schema.clone(), vec![...])?,
];

// 將 Vec<RecordBatch> 轉換為流
let stream = stream::iter(batches.into_iter().map(Ok));
let adapter = RecordBatchStreamAdapter::new(schema, stream);
let sendable_stream: SendableRecordBatchStream = Box::pin(adapter);

這個 adapter 封裝了實作 RecordBatchStream trait 所需的樣板代碼,讓實作者只需專注於數據生成邏輯。

Pull-based Volcano 執行模型

Volcano 模型的歷史與核心概念

歷史背景

Volcano 模型由 Goetz Graefe 在 1990 年的論文 "Volcano - An Extensible and Parallel Query Evaluation System" 中提出,是資料庫查詢執行引擎設計的里程碑。這個模型奠定了現代資料庫系統的基礎架構,包括 PostgreSQL、MySQL、Apache Spark 和 DataFusion 等都受其影響。

核心思想

Volcano 模型將查詢執行計劃組織成一棵算子樹,每個算子都實作統一的介面:

trait Operator {
    fn open();           // 初始化算子
    fn next() -> Tuple;  // 獲取下一個元組
    fn close();          // 清理資源
}

關鍵特點:

  1. 迭代器模式:每個算子都像一個迭代器,通過 next() 方法產生數據
  2. 按需計算:只有在 next() 被調用時才進行計算
  3. 管道執行:數據以元組(tuple)為單位在算子間流動

Pull-based vs Push-based:兩種執行範式

在查詢執行引擎中,有兩種主要的數據流動範式:

Pull-based(拉取式)- Volcano 模型

┌─────────────────┐
│   Parent Op     │ ──┐
│  (consumer)     │   │ 1. "我需要數據"
└────────┬────────┘   │    parent.next()
         │            │
         │ 2. 調用    │
         ↓            │
┌─────────────────┐   │
│   Child Op      │ ──┘ 3. "這是數據"
│  (producer)     │      return tuple
└─────────────────┘

特點:父算子主動拉取數據,控制權在消費方。

Push-based(推送式)

┌─────────────────┐
│   Child Op      │ ──┐ 1. "這是數據"
│  (producer)     │   │    child.produce()
└────────┬────────┘   │
         │            │
         │ 2. 推送    │
         ↓            │
┌─────────────────┐   │
│   Parent Op     │ ──┘ 3. "已接收"
│  (consumer)     │      parent.consume()
└─────────────────┘

特點:子算子主動推送數據,控制權在生產方。

對比分析:Pull-based vs Push-based

特性 Pull-based (Volcano) Push-based
控制流 從上到下(父節點控制) 從下到上(子節點控制)
函數調用 parent → child.next() child → parent.consume()
背壓處理 自然支援(不拉取就不產生) 需要額外機制(緩衝隊列)
記憶體使用 按需分配,易於控制 可能需要大量緩衝區
LIMIT 優化 容易(停止拉取即可) 困難(已產生的數據難以停止)
管道並行 較難(受迭代器限制) 較易(可並行推送)
快取友好性 一般 較好(批次處理)
實作複雜度 簡單直觀 較複雜(需要狀態管理)

具體範例對比

假設查詢:SELECT name FROM users WHERE age > 18 LIMIT 10;

Pull-based 執行流程

// pseudocode
impl LimitExec {
    fn next() -> Option<Tuple> {
        if self.count >= 10 {
            return None;  // 已達 LIMIT,停止拉取
        }
        let tuple = self.input.next()?;  // 向下拉取
        self.count += 1;
        Some(tuple)
    }
}

// 優勢:一旦達到 LIMIT,立即停止
// FilterExec 和 TableScan 不會被繼續調用

Push-based 執行流程

// pseudocode
impl TableScan {
    fn produce() {
        for tuple in self.data {
            if !self.parent.consume(tuple) {
                break;  // 需要檢查父節點是否還需要數據
            }
        }
    }
}

// 劣勢:需要額外的信號機制來停止生產

DataFusion 的選擇:Pull-based Volcano 模型

在 DataFusion 中,Volcano 模型的體現為:

┌─────────────────────────────────┐
│     ProjectionExec              │
│   (需要數據時呼叫 poll_next)     │
└──────────────┬──────────────────┘
               │ poll_next()
               ↓
┌──────────────────────────────────┐
│        FilterExec                │
│  (從輸入拉取,過濾後返回)          │
└──────────────┬───────────────────┘
               │ poll_next()
               ↓
┌──────────────────────────────────┐
│       TableScan                  │
│    (從存儲讀取數據)               │
└──────────────────────────────────┘

DataFusion 為什麼選擇 Pull-based?

DataFusion 選擇 Pull-based Volcano 模型基於以下考量:

1. 自然的背壓機制(Backpressure)

這是 Pull-based 最大的優勢。在處理大規模數據時:

情境:快速生產 vs 慢速消費

Pull-based:
TableScan (快) ──X──> Filter (慢) ──X──> Consumer (很慢)
            ↑ 不被調用          ↑ 調用次數少
            不產生數據          只在需要時拉取

Push-based:
TableScan (快) ──→→→→ Buffer ──→ Filter (慢) ──→ Consumer (很慢)
            不斷推送    ↑ 記憶體爆炸風險
                       需要大量緩衝區

實際影響

  • Pull-based:記憶體使用穩定,與查詢複雜度無關
  • Push-based:可能需要數 GB 的緩衝區來存儲快速生產的數據

2. 延遲計算與短路優化(Lazy Evaluation & Short-circuit)

考慮這些常見查詢:

-- 場景 A: LIMIT 查詢
SELECT * FROM billion_rows_table LIMIT 10;

-- 場景 B: 提前退出
SELECT * FROM users WHERE username = 'admin';  -- 唯一索引

Pull-based 的優勢:

// LimitExec 達到 10 行後
fn poll_next(...) -> Poll<Option<RecordBatch>> {
    if self.produced >= self.limit {
        return Poll::Ready(None);  // 直接結束
    }
    // 不再調用 input.poll_next()
    // TableScan 自動停止讀取
}

Push-based 則需要:

  • 顯式的取消信號
  • 已經在管道中的數據仍需處理
  • 更複雜的狀態同步

3. 與 Rust 異步生態的完美結合

Rust 的 Stream trait 天生就是 pull-based:

pub trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) 
        -> Poll<Option<Self::Item>>;
}

這意味著:

  • 可以直接使用 futures crate 的豐富工具鏈(map, filter, try_flatten 等)
  • 與 Tokio 異步運行時無縫整合
  • 利用 Rust 的零成本抽象,沒有額外的運行時開銷

4. 簡化的錯誤處理

Pull-based 的錯誤傳播非常直觀:

// 子算子發生錯誤
fn poll_next(...) -> Poll<Option<Result<RecordBatch>>> {
    match self.read_data() {
        Err(e) => return Poll::Ready(Some(Err(e))),  // 立即返回錯誤
        Ok(data) => // ...
    }
}

// 父算子接收錯誤
match ready!(self.input.poll_next_unpin(cx)) {
    Some(Err(e)) => return Poll::Ready(Some(Err(e))),  // 向上傳播
    // ...
}

錯誤會自然地向上傳播,整個查詢會 fail-fast。

Push-based 則需要:

  • 回調函數來處理錯誤
  • 複雜的錯誤狀態管理
  • 可能需要清理已推送但未處理的數據

5. 除錯與性能分析更容易

Pull-based 的調用棧清晰可追蹤:

LimitExec::poll_next()
  └─> ProjectionExec::poll_next()
        └─> FilterExec::poll_next()
              └─> TableScan::poll_next()

可以輕鬆使用 Rust 的 profiler(如 cargo flamegraph)來分析性能瓶頸。

Push-based 的適用場景

儘管 DataFusion 選擇了 Pull-based,Push-based 在某些場景下仍有優勢:

  1. 高度並行的數據處理:Apache Flink、RayDB 等流處理系統
  2. 批次處理優化:可以一次推送大量數據,提高快取命中率
  3. 複雜的事件驅動架構:需要多個消費者同時處理同一數據流

混合模型

一些現代系統採用混合策略:

  • DuckDB:主要是 Pull-based,但在某些算子(如 Hash Join 的 build phase)使用 Push-based
  • Velox (Meta):可配置的執行模型,根據查詢特性動態選擇

實際範例:FilterExec 的 execute 實作

讓我們看一個真實的例子,了解 Pull-based 模型如何運作:

// datafusion/physical-plan/src/filter.rs
impl ExecutionPlan for FilterExec {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
        
        // 1. 先執行輸入算子,獲得輸入流
        let input_stream = self.input.execute(partition, context)?;
        
        // 2. 創建 FilterExecStream,包裝輸入流
        Ok(Box::pin(FilterExecStream {
            schema: self.schema(),
            predicate: Arc::clone(&self.predicate),
            input: input_stream,  // 保存輸入流的引用
            baseline_metrics,
            projection: self.projection.clone(),
        }))
    }
}

注意這裡的關鍵點:

  1. execute() 方法本身不是異步的,它只是設置好流的結構並立即返回
  2. 真正的計算發生在返回的流被輪詢(poll)時
  3. 這符合 Volcano 模型的精神:在需要時才拉取數據

RecordBatch 的流動過程

poll_next - 數據流動的心臟

在 Rust 的異步系統中,Stream trait 的核心是 poll_next() 方法。每次父算子需要數據時,會呼叫子算子流的 poll_next()

pub trait Stream {
    type Item;
    
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>>;
}

Poll 是一個列舉,表示異步操作的狀態:

  • Poll::Ready(Some(batch)):成功產生一個 RecordBatch
  • Poll::Ready(None):流已結束,沒有更多數據
  • Poll::Pending:數據尚未就緒,需要等待(例如等待 I/O)

具體範例:CoalesceBatchesStream

讓我們看一個實際的 poll_next 實作,了解數據如何在算子間流動:

// datafusion/physical-plan/src/coalesce_batches.rs
impl Stream for CoalesceBatchesStream {
    type Item = Result<RecordBatch>;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        loop {
            match &self.inner_state {
                CoalesceBatchesStreamState::Pull => {
                    // 1. 向輸入流請求下一個 batch
                    let input_batch = ready!(self.input.poll_next_unpin(cx));
                    
                    match input_batch {
                        Some(Ok(batch)) => {
                            // 2. 將 batch 加入合併緩衝區
                            match self.coalescer.push_batch(batch) {
                                CoalescerState::Continue => {
                                    // 繼續拉取更多 batch
                                    continue;
                                }
                                CoalescerState::TargetReached => {
                                    // 緩衝區已滿,準備返回
                                    self.inner_state = 
                                        CoalesceBatchesStreamState::ReturnBuffer;
                                }
                                CoalescerState::LimitReached => {
                                    self.inner_state = 
                                        CoalesceBatchesStreamState::Exhausted;
                                }
                            }
                        }
                        None => {
                            // 輸入流已結束
                            self.inner_state = 
                                CoalesceBatchesStreamState::Exhausted;
                        }
                        Some(Err(e)) => return Poll::Ready(Some(Err(e))),
                    }
                }
                CoalesceBatchesStreamState::ReturnBuffer => {
                    // 3. 返回合併後的 batch
                    match self.coalescer.finish()? {
                        Some(batch) => {
                            self.inner_state = CoalesceBatchesStreamState::Pull;
                            return Poll::Ready(Some(Ok(batch)));
                        }
                        None => {
                            self.inner_state = CoalesceBatchesStreamState::Pull;
                        }
                    }
                }
                CoalesceBatchesStreamState::Exhausted => {
                    // 4. 返回最後的緩衝數據
                    return Poll::Ready(self.coalescer.finish()?.map(Ok));
                }
            }
        }
    }
}

這個範例展示了幾個重要的模式:

  1. 狀態機設計:使用內部狀態(Pull / ReturnBuffer / Exhausted)管理執行流程
  2. 級聯拉取:通過 self.input.poll_next_unpin(cx) 向上游請求數據
  3. 批次處理:累積多個小 batch 合併成大 batch,提高效率
  4. 錯誤傳播:遇到錯誤時立即向上傳播,中止執行

數據流的完整生命週期

讓我們追蹤一個查詢中 RecordBatch 的完整流動路徑:

SELECT name FROM users WHERE age > 18 LIMIT 10;

執行計劃可能是:

LimitExec: fetch=10
  ProjectionExec: [name]
    FilterExec: age > 18
      TableScan: users

數據流動過程:

1. 用戶呼叫 stream.next().await
   │
   ↓
2. LimitExec.poll_next()
   │ 判斷是否已滿足 LIMIT
   ↓ 若未滿足,向下游請求
3. ProjectionExec.poll_next()
   │ 向下游請求完整的 RecordBatch
   ↓
4. FilterExec.poll_next()
   │ 向下游請求原始數據
   ↓
5. TableScan.poll_next()
   │ 從存儲讀取數據(可能是 Parquet 檔案)
   │ 返回 Poll::Ready(Some(batch_raw))
   ↑
6. FilterExec 接收到 batch_raw
   │ 應用過濾條件 age > 18
   │ 返回 Poll::Ready(Some(batch_filtered))
   ↑
7. ProjectionExec 接收到 batch_filtered
   │ 只保留 name 列
   │ 返回 Poll::Ready(Some(batch_projected))
   ↑
8. LimitExec 接收到 batch_projected
   │ 檢查行數,可能截斷 batch
   │ 返回 Poll::Ready(Some(batch_limited))
   ↑
9. 最終結果返回給用戶

整個過程中,數據是逐批次流動的,不需要將整個結果集載入記憶體。

TaskContext - 執行期的資源管理者

回顧與深入

在 Day 10 的文章中,我們介紹過 DataFusion 的三層架構(SessionContext → SessionState → TaskContext),並了解到 TaskContext 是執行層的輕量級上下文。今天我們要從執行期間的實際應用角度,深入探討 TaskContext 如何支撐查詢的運行。

每次呼叫 execute() 方法時,都會傳入一個 Arc<TaskContext>,它在執行期間扮演著關鍵的資源管理者角色。讓我們看看它在實際執行中提供哪些具體支援:

執行期間的核心功能

1. 記憶體池管理(Memory Pool)

let memory_pool = context.memory_pool();
// 嘗試分配記憶體
memory_pool.try_grow(required_size)?;

記憶體池確保單一查詢不會耗盡系統記憶體,並在必要時觸發 spilling(將數據寫入磁碟)。

2. 提供函數註冊表

當執行計劃中包含 UDF(User-Defined Function)時,可以通過 TaskContext 查找函數實作:

let scalar_func = context.scalar_functions()
    .get("my_custom_func")
    .ok_or_else(|| DataFusionError::Plan("Function not found".to_string()))?;

3. 訪問運行時環境(RuntimeEnv)

let runtime = context.runtime_env();
// 獲取對象存儲、磁碟管理器等
let object_store = runtime.object_store(...)?;

這使得執行算子能夠訪問外部資源,如讀取 S3 上的 Parquet 檔案。

4. 配置訪問

let batch_size = context.session_config()
    .batch_size();

執行算子可以根據配置調整行為,例如決定每個 RecordBatch 的大小。

在執行計劃中的傳遞方式

TaskContext 被包裝在 Arc 中,在整個執行計劃樹中共享:

impl ExecutionPlan for FilterExec {
    fn execute(
        &self,
        partition: usize,
        context: Arc<TaskContext>,  // 接收共享的 TaskContext
    ) -> Result<SendableRecordBatchStream> {
        // 訪問記憶體池
        let memory_pool = context.memory_pool();
        
        // 獲取配置
        let batch_size = context.session_config().batch_size();
        
        // 向下傳遞給輸入算子
        let input = self.input.execute(partition, Arc::clone(&context))?;
        
        // 創建執行流...
    }
}

這種設計確保:

  • 統一的資源管理:所有算子共享同一個記憶體池,避免某個算子過度消耗記憶體
  • 配置一致性:整個查詢使用相同的執行配置(如 batch_size)
  • 並行安全:多個分區可以安全地共享同一個 TaskContext,因為內部的記憶體池等組件已經處理好並發訪問

背壓處理 - 自然的流量控制

什麼是背壓?

在數據流系統中,背壓(Backpressure)是指當下游處理速度跟不上上游生產速度時,系統需要一種機制來減緩上游的生產速度,防止記憶體溢出。

Rust 異步流的背壓機制

DataFusion 的 Stream 執行模型天然支援背壓,無需額外的複雜機制:

fn poll_next(
    mut self: Pin<&mut Self>,
    cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
    // 向上游請求數據
    let input_batch = ready!(self.input.poll_next_unpin(cx));
    
    // 如果上游返回 Poll::Pending,
    // ready!() 宏會直接返回 Poll::Pending
    // 這意味著當前算子也暫停了
    
    // 處理 batch...
}

關鍵機制

  1. Poll::Pending 的傳播:當上游數據未就緒時,返回 Poll::Pending,當前算子也暫停
  2. Waker 機制:當上游數據就緒時,會喚醒(wake)等待的下游任務
  3. 按需拉取:只有在下游需要時才會呼叫 poll_next(),自然實現流量控制

實際場景:慢速網路讀取

假設我們從慢速網路讀取 Parquet 檔案:

Time 0: FilterExec.poll_next()
        ↓
        TableScan.poll_next()
        ↓ 發起網路請求
        返回 Poll::Pending (數據未到達)
        ↑ 傳播 Pending
        FilterExec 也返回 Poll::Pending

Time 1: (數據到達,Waker 喚醒任務)
        FilterExec.poll_next() 再次被呼叫
        ↓
        TableScan.poll_next()
        ↓ 數據已就緒
        返回 Poll::Ready(Some(batch))
        ↑
        FilterExec 處理 batch 並返回

這種機制確保:

  • 不會阻塞執行緒:等待 I/O 時,執行緒可以去執行其他任務
  • 記憶體安全:不會無限制地緩衝數據
  • 自動調節:快速消費者會迅速獲得數據,慢速消費者會自然限制生產速度

協作式多工(Cooperative Multitasking)

DataFusion 還實作了 CooperativeStream 來防止單一任務壟斷 CPU:

// datafusion/physical-plan/src/coop.rs
impl<T> Stream for CooperativeStream<T>
where
    T: RecordBatchStream + Unpin,
{
    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        // 檢查是否還有執行預算
        if self.budget == 0 {
            // 預算耗盡,主動讓出 CPU
            cx.waker().wake_by_ref();
            self.budget = YIELD_FREQUENCY;
            return Poll::Pending;
        }
        
        self.budget -= 1;
        self.inner.poll_next_unpin(cx)
    }
}

這確保即使有大量數據可處理,也會定期讓出 CPU 給其他任務,實現公平調度。

小結

今天我們深入探討了 DataFusion 的 Stream 執行模型:

  1. SendableRecordBatchStream:可在執行緒間傳遞的異步數據流,提供 Schema 資訊和批次數據
  2. Pull-based Volcano 模型:父算子主動拉取數據,實現簡潔的控制流和自然的背壓
  3. RecordBatch 流動:通過 poll_next() 方法級聯拉取,數據逐批次在算子間傳遞
  4. TaskContext:提供執行期所需的資源,包括記憶體池、函數註冊表和配置
  5. 背壓處理:利用 Rust 異步機制自動實現流量控制,防止記憶體溢出

明天我們將深入 Projection 和 Filter 算子,看看它們如何實作表達式求值和數據過濾,以及如何利用向量化執行提升性能。

參考資料

核心論文

  1. Volcano - An Extensible and Parallel Query Evaluation System (1990) - Goetz Graefe 的經典論文,奠定了現代查詢執行引擎的基礎
  2. Volcano/Iterator Model vs Push-based Execution - 比較不同執行模型的性能研究

DataFusion 官方文件

  1. DataFusion 官方文件 - SendableRecordBatchStream
  2. DataFusion 原始碼 - execution_plan.rs
  3. DataFusion 原始碼 - task.rs
  4. DataFusion 原始碼 - stream.rs

Rust 異步編程

  1. Rust Async Book - Streams
  2. Tokio Documentation - Cooperative Scheduling
  3. Futures crate - Stream trait

相關系統設計

  1. DuckDB: Push-Based Execution Model
  2. Apache Flink: Streaming Execution

上一篇
Day 16: ExecutionPlan 體系架構 Part 1 - Trait 設計
下一篇
Day 18: 基礎執行算子 - Projection 和 Filter
系列文
DataFusion 闖關攻略:30 天學習 Rust 查詢引擎之旅19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言